{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "%run startup.py"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "application/javascript": [
       "$.getScript('./assets/js/ipython_notebook_toc.js')"
      ],
      "text/plain": [
       "<IPython.core.display.Javascript object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "%%javascript\n",
    "$.getScript('./assets/js/ipython_notebook_toc.js')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# A Decision Tree of Observable Operators\n",
    "\n",
    "## Part 2: Combining Observables\n",
    "\n",
    "> source: http://reactivex.io/documentation/operators.html#tree.  \n",
    "> (transcribed to RxPY 1.5.7, Py2.7 / 2016-12, Gunther Klessinger, [axiros](http://www.axiros.com))  \n",
    "\n",
    "**This tree can help you find the ReactiveX Observable operator you’re looking for.**  \n",
    "See [Part 1](./A Decision Tree of Observable Operators. Part I - Creation.ipynb) for Usage and Output Instructions.  \n",
    "We also require acquaintance with the [marble diagrams](./Marble Diagrams.ipynb) feature of RxPy.\n",
    "\n",
    "[This](http://www.introtorx.com/Content/v1.0.10621.0/12_CombiningSequences.html) is a helpful accompanying read.  \n",
    "<h2 id=\"tocheading\">Table of Contents</h2>\n",
    "<div id=\"toc\"></div>\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# I want to create an Observable by **combining** other Observables"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## ... and emitting all of the items from all of the Observables in whatever order they are received: **[merge / merge_all](http://reactivex.io/documentation/operators/merge.html) **"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "========== merge ==========\n",
      "\n",
      "module rx.linq.observable.merge\n",
      "@extensionclassmethod(Observable)  # noqa\n",
      "def merge(cls, *args):\n",
      "    Merges all the observable sequences into a single observable\n",
      "    sequence. The scheduler is optional and if not specified, the\n",
      "    immediate scheduler is used.\n",
      "\n",
      "    1 - merged = rx.Observable.merge(xs, ys, zs)\n",
      "    2 - merged = rx.Observable.merge([xs, ys, zs])\n",
      "    3 - merged = rx.Observable.merge(scheduler, xs, ys, zs)\n",
      "    4 - merged = rx.Observable.merge(scheduler, [xs, ys, zs])\n",
      "\n",
      "    Returns the observable sequence that merges the elements of the\n",
      "    observable sequences.\n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   2.3     M New subscription on stream 276542077\n",
      "   3.0     M [next]    0.5: 0\n",
      "   3.2     M [next]    0.7: 1\n",
      "   3.4     M [err ]    0.9: integer division or modulo by zero\n",
      "   5.4    T5 [next]    1.1: a\n",
      "\n",
      "   5.9    T5 [next]    1.6: b   4.2     M New subscription on stream 276542101\n",
      "\n",
      "   6.0    T5 [next]    1.7: c\n",
      "   6.5    T5 [next]    2.2: 0\n",
      "   6.7    T5 [next]    2.4: 1\n",
      "   6.8    T5 [err ]    2.5: integer division or modulo by zero\n"
     ]
    }
   ],
   "source": [
    "reset_start_time(O.merge)\n",
    "l = []\n",
    "def excepting_f(obs):\n",
    "    for i in range(10):\n",
    "        l.append(1)\n",
    "        obs.on_next(1 / (3 - len(l)))\n",
    "stream1 = O.from_(('a', 'b', 'c'))\n",
    "stream2 = O.create(excepting_f)\n",
    "# merged stream stops in any case at first exception!\n",
    "# No guarantee of order of those immediately created streams !\n",
    "d = subs(stream1.merge(stream2))\n",
    "l = []\n",
    "d = subs(O.merge(new_thread_scheduler, [stream1, stream2]))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 167,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "========== merge_all ==========\n",
      "\n",
      "function merge_all of module rx.linq.observable.merge:\n",
      "Merges an observable sequence of observable sequences into an\n",
      "    observable sequence.\n",
      "\n",
      "    Returns the observable sequence that merges the elements of the inner\n",
      "    sequences.\n",
      "    \n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   1.0     M New subscription on stream 276411209\n",
      "   2.5     M [next]    1.2: 1\n",
      "   3.6     M [next]    2.4: 2\n",
      "   3.9     M [next]    2.6: 1\n",
      "   4.5     M [next]    3.2: 3\n",
      "   4.9     M [next]    3.6: 2\n",
      "   5.0     M [next]    3.8: 1\n",
      "   5.6     M [next]    4.4: 3\n",
      "   5.9     M [next]    4.6: 2\n",
      "   6.5     M [next]    5.2: 3\n",
      "   6.8     M [cmpl]    5.5: fin\n",
      "\n",
      "   7.3     M New subscription on stream 276483801\n",
      "  19.0 T1046 [next]   11.7: 0 (streams with time delays between events)\n",
      "  19.9 T1047 [next]   12.6: 0 (streams with time delays between events)\n",
      "  21.1 T1048 [next]   13.8: 0 (streams with time delays between events)\n",
      "  30.2 T1049 [next]   22.9: 1 (streams with time delays between events)\n",
      "  31.6 T1050 [next]   24.4: 1 (streams with time delays between events)\n",
      "  32.4 T1051 [next]   25.1: 1 (streams with time delays between events)\n",
      "  41.7 T1052 [next]   34.4: 2 (streams with time delays between events)\n",
      "  42.4 T1053 [next]   35.1: 2 (streams with time delays between events)\n",
      "  43.3 T1054 [next]   36.0: 2 (streams with time delays between events)\n",
      "  43.8 T1054 [cmpl]   36.5: fin (streams with time delays between events)\n"
     ]
    }
   ],
   "source": [
    "rst(O.merge_all, title='merge_all')\n",
    "meta = O.repeat(O.from_((1, 2, 3)), 3)\n",
    "# no guarantee of order, immediatelly created:\n",
    "d = subs(meta.merge_all())\n",
    "# Introducing delta ts:\n",
    "d = subs(O.repeat(O.timer(10, 10)\\\n",
    "                  .take(3), 3)\\\n",
    "                  .merge_all(),\n",
    "         name='streams with time delays between events')\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## ... and emitting all of the items from all of the Observables, one Observable at a time: **[concat](http://reactivex.io/documentation/operators/concat.html) **"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 168,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "function concat of module rx.linq.observable.concat:\n",
      "Concatenates all the observable sequences.\n",
      "\n",
      "    1 - res = Observable.concat(xs, ys, zs)\n",
      "    2 - res = Observable.concat([xs, ys, zs])\n",
      "\n",
      "    Returns an observable sequence that contains the elements of each given\n",
      "    sequence, in sequential order.\n",
      "    \n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   0.9     M New subscription on stream 278620693\n",
      "   1.4     M [next]    0.3: 1\n",
      "   1.6     M [next]    0.6: 2\n",
      "   1.8     M [cmpl]    0.8: fin\n",
      "\n",
      "   2.2     M New subscription on stream 278620893\n",
      "   2.7     M [next]    0.4: 3\n",
      "   2.8     M [next]    0.6: 4\n",
      "   3.0     M [cmpl]    0.7: fin\n",
      "\n",
      "   3.4     M New subscription on stream 276406121\n",
      "   3.8     M [next]    0.3: 3\n",
      "   4.1     M [next]    0.5: 4\n",
      "   4.4     M [next]    0.8: 1\n",
      "   4.6     M [next]    1.1: 2\n",
      "   5.0     M [cmpl]    1.5: fin\n"
     ]
    }
   ],
   "source": [
    "rst(O.concat)\n",
    "s1 = O.from_((1, 2))\n",
    "s2 = O.from_((3, 4))\n",
    "# while normal subscriptions work as expected...\n",
    "d1, d2 = subs(s1), subs(s2)\n",
    "# ... another one can have the order reversed\n",
    "d = subs(O.concat([s2, s1]))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 169,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "   0.9     M New subscription on stream 276485909\n",
      "\n",
      "   3.8     M New subscription on stream 276485905\n",
      "   5.8     M main thread sleeping 1s\n",
      "  12.0 T1058 [next]   10.9: 1 (A)\n",
      " 219.2 T1066 [next]  215.4: a (B)\n",
      " 226.2 T1059 [next]  225.1: 2 (A)\n",
      " 325.2 T1068 [next]  321.4: b (B)\n",
      " 439.1 T1069 [next]  435.3: c (B)\n",
      " 446.4 T1070 [cmpl]  442.5: fin (B)\n",
      " 536.3 T1061 [next]  535.2: 3 (A)\n",
      " 547.5 T1063 [cmpl]  546.3: fin (A)\n",
      "\n",
      "\n",
      "========== Concatenating in reverse order ==========\n",
      "\n",
      "\n",
      "   0.5     M New subscription on stream 276308601\n",
      " 216.5 T1074 [next]  215.7: a (C)\n",
      " 327.7 T1075 [next]  326.9: b (C)\n",
      " 436.7 T1077 [next]  436.0: c (C)\n",
      " 456.4 T1082 [next]  455.7: 1 (C)\n",
      " 670.3 T1083 [next]  669.5: 2 (C)\n",
      " 976.9 T1085 [next]  976.2: 3 (C)\n",
      " 990.9 T1087 [cmpl]  990.1: fin (C)\n"
     ]
    }
   ],
   "source": [
    "rst()\n",
    "# See the marbles notebook:\n",
    "s1 = O.from_marbles('1--2---3|').to_blocking()\n",
    "s2 = O.from_marbles('--a-b-c|' ).to_blocking()\n",
    "d = (subs(s1, name='A'),\n",
    "     subs(s2, name='B'))\n",
    "rst(title=\"Concatenating in reverse order\", sleep=1)\n",
    "d = subs(O.concat([s2, s1]), name='C')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## ... by combining the items from two or more Observables sequentially to come up with new items to emit"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### ... whenever *each* of the Observables has emitted a new item **[zip / zip_list](http://reactivex.io/documentation/operators/zip.html)**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 176,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "function zip of module rx.linq.observable.zip:\n",
      "Merges the specified observable sequences into one observable\n",
      "    sequence by using the selector function whenever all of the observable\n",
      "    sequences have produced an element at a corresponding index.\n",
      "\n",
      "    The last element in the arguments must be a function to invoke for each\n",
      "    series of elements at corresponding indexes in the sources.\n",
      "\n",
      "    Arguments:\n",
      "    args -- Observable sources.\n",
      "\n",
      "    Returns an observable {Observable} sequence containing the result of\n",
      "    combining elements of the sources using the specified result selector\n",
      "    function.\n",
      "    \n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   0.8     M New subscription on stream 276463149\n",
      "   3.0     M [next]    2.2: 0 : 1 : 2\n",
      "   3.5     M [next]    2.6: 1 : 2 : 3\n",
      "   3.9     M [next]    3.0: 2 : 3 : 4\n",
      "   4.3     M [cmpl]    3.4: fin\n"
     ]
    }
   ],
   "source": [
    "rst(O.zip)\n",
    "s1 = O.range(0, 5)\n",
    "d = subs(O.zip(s1, s1.skip(1), s1.skip(2), lambda s1, s2, s3: '%s : %s : %s' % (s1, s2, s3)))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 179,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "function zip_list of module rx.linq.observable.ziparray:\n",
      "Merge the specified observable sequences into one observable\n",
      "    sequence by emitting a list with the elements of the observable\n",
      "    sequences at corresponding indexes.\n",
      "\n",
      "    Keyword arguments:\n",
      "    :param Observable cls: Class\n",
      "    :param Tuple args: Observable sources.\n",
      "\n",
      "    :return: Returns an observable sequence containing lists of\n",
      "    elements at corresponding indexes.\n",
      "    :rtype: Observable\n",
      "    \n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   0.9     M New subscription on stream 276480037\n",
      "   2.9     M [next]    1.8: [0, 1, 2]\n",
      "   3.3     M [next]    2.2: [1, 2, 3]\n",
      "   3.9     M [next]    2.8: [2, 3, 4]\n",
      "   4.4     M [cmpl]    3.3: fin\n"
     ]
    }
   ],
   "source": [
    "rst(O.zip_list) # alias: zip_array\n",
    "s1 = O.range(0, 5)\n",
    "d = subs(O.zip_list(s1, s1.skip(1), s1.skip(2)))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### ... whenever *any* of the Observables has emitted a new item **[combine_latest](http://reactivex.io/documentation/operators/combinelatest.html)**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "========== combine_latest ==========\n",
      "\n",
      "function combine_latest of module rx.linq.observable.combinelatest:\n",
      "Merges the specified observable sequences into one observable\n",
      "    sequence by using the selector function whenever any of the\n",
      "    observable sequences produces an element.\n",
      "\n",
      "    1 - obs = Observable.combine_latest(obs1, obs2, obs3,\n",
      "                                       lambda o1, o2, o3: o1 + o2 + o3)\n",
      "    2 - obs = Observable.combine_latest([obs1, obs2, obs3],\n",
      "                                        lambda o1, o2, o3: o1 + o2 + o3)\n",
      "\n",
      "    Returns an observable sequence containing the result of combining\n",
      "    elements of the sources using the specified result selector\n",
      "    function.\n",
      "    \n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   1.0     M New subscription on stream 276805405\n",
      "   3.0     M main thread sleeping 1s\n",
      " 157.1  T195 [next]  156.1: First : 0, Second: 0\n",
      " 211.6  T196 [next]  210.6: First : 1, Second: 0\n",
      " 312.5  T197 [next]  311.6: First : 1, Second: 1\n",
      " 316.0  T198 [next]  315.0: First : 2, Second: 1\n",
      " 421.2  T200 [next]  420.2: First : 3, Second: 1\n",
      " 465.7  T199 [next]  464.7: First : 3, Second: 2\n",
      " 465.9  T199 [cmpl]  464.9: fin\n",
      "\n",
      "\n",
      "========== For comparison: merge ==========\n",
      "\n",
      "\n",
      "   1.0     M New subscription on stream 276157517\n",
      " 104.0  T204 [next]  103.0: First : 0\n",
      " 157.6  T203 [next]  156.5: Second: 0\n",
      " 207.3  T205 [next]  206.2: First : 1\n",
      " 311.4  T207 [next]  310.4: First : 2 311.7  T206 [next]  310.7: Second: 1\n",
      "\n",
      " 417.5  T208 [next]  416.5: First : 3\n",
      " 417.8  T208 [cmpl]  416.7: fin\n"
     ]
    }
   ],
   "source": [
    "rst(O.combine_latest, title='combine_latest')\n",
    "s1 = O.interval(100).map(lambda i: 'First : %s' % i)\n",
    "s2 = O.interval(150).map(lambda i: 'Second: %s' % i)\n",
    "# the start is interesting, both must have emitted, so it starts at 150ms with 0/0:\n",
    "d = subs(s1.combine_latest(s2, lambda s1, s2: '%s, %s' % (s1, s2)).take(6))\n",
    "\n",
    "rst(title='For comparison: merge', sleep=1)\n",
    "d = subs(s1.merge(s2).take(6))\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### ... whenever *the first* of the Observables has emitted a new item **[with_latest_from](http://reactivex.io/documentation/operators/combinelatest.html)**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "========== with_latest_from ==========\n",
      "\n",
      "function with_latest_from of module rx.linq.observable.withlatestfrom:\n",
      "Merges the specified observable sequences into one observable sequence\n",
      "    by using the selector function only when the first observable sequence\n",
      "    produces an element. The observables can be passed either as seperate\n",
      "    arguments or as a list.\n",
      "\n",
      "    1 - obs = Observable.with_latest_from(obs1, obs2, obs3,\n",
      "                                       lambda o1, o2, o3: o1 + o2 + o3)\n",
      "    2 - obs = Observable.with_latest_from([obs1, obs2, obs3],\n",
      "                                        lambda o1, o2, o3: o1 + o2 + o3)\n",
      "\n",
      "    Returns an observable sequence containing the result of combining\n",
      "    elements of the sources using the specified result selector function.\n",
      "    \n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   1.0     M New subscription on stream 276799777\n",
      " 147.0  T170 [next]  146.0: First : 0, Second: 1\n",
      " 292.0  T174 [next]  290.9: First : 1, Second: 4\n",
      " 437.8  T178 [next]  436.7: First : 2, Second: 7\n",
      " 581.7  T182 [next]  580.7: First : 3, Second: 9\n",
      " 723.3  T185 [next]  722.2: First : 4, Second: 12\n",
      " 867.0  T189 [next]  865.9: First : 5, Second: 15\n",
      " 867.4  T189 [cmpl]  866.3: fin\n"
     ]
    }
   ],
   "source": [
    "rst(O.with_latest_from, title='with_latest_from')\n",
    "s1 = O.interval(140).map(lambda i: 'First : %s' % i)\n",
    "s2 = O.interval(50) .map(lambda i: 'Second: %s' % i)\n",
    "d = subs(s1.with_latest_from(s2, lambda s1, s2: '%s, %s' % (s1, s2)).take(6))\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### ... whenever an item is emitted by one Observable in a window defined by an item emitted by another **[join](http://reactivex.io/documentation/operators/join.html)**\n",
    "The join operator takes four parameters:\n",
    "\n",
    "1. the second Observable to combine with the source Observable\n",
    "1. a function that accepts an item from the source Observable and returns an Observable whose lifespan governs the duration during which that item will combine with items from the second Observable\n",
    "1. a function that accepts an item from the second Observable and returns an Observable whose lifespan governs the duration during which that item will combine with items from the first Observable\n",
    "1. a function that accepts an item from the first Observable and an item from the second Observable and returns an item to be emitted by the Observable returned from join\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "function join of module rx.linq.observable.join:\n",
      "Correlates the elements of two sequences based on overlapping\n",
      "    durations.\n",
      "\n",
      "    Keyword arguments:\n",
      "    right -- The right observable sequence to join elements for.\n",
      "    left_duration_selector -- A function to select the duration (expressed\n",
      "        as an observable sequence) of each element of the left observable\n",
      "        sequence, used to determine overlap.\n",
      "    right_duration_selector -- A function to select the duration (expressed\n",
      "        as an observable sequence) of each element of the right observable\n",
      "        sequence, used to determine overlap.\n",
      "    result_selector -- A function invoked to compute a result element for\n",
      "        any two overlapping elements of the left and right observable\n",
      "        sequences. The parameters passed to the function correspond with\n",
      "        the elements from the left and right source sequences for which\n",
      "        overlap occurs.\n",
      "\n",
      "    Return an observable sequence that contains result elements computed\n",
      "    from source elements that have an overlapping duration.\n",
      "    \n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   1.2     M New subscription on stream 275960897\n",
      " 109.1  T449 [next]  107.8: First : 0 Second: 0\n",
      " 215.3  T453 [next]  214.0: First : 1 Second: 1\n",
      " 321.6  T457 [next]  320.4: First : 2 Second: 2\n",
      " 426.0  T461 [next]  424.8: First : 3 Second: 3\n",
      " 533.3  T465 [next]  532.1: First : 4 Second: 4\n",
      " 533.7  T465 [cmpl]  532.4: fin\n"
     ]
    }
   ],
   "source": [
    "rst(O.join)\n",
    "# this one is pretty timing critical and output seems swallowed with 2 threads (over)writing.\n",
    "# better try this with timer(0) on the console. Also the scheduler of the timers is critical,\n",
    "# try other O.timer schedulers...\n",
    "xs = O.interval(100).map(lambda i: 'First : %s' % i)\n",
    "ys = O.interval(101).map(lambda i: 'Second: %s' % i)\n",
    "d = subs(xs.join(ys, lambda _: O.timer(10), lambda _: O.timer(0), lambda x, y: '%s %s' % (x, y)).take(5))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### ... or, alternatively, **[group_join](http://reactivex.io/documentation/operators/join.html)**\n",
    "\n",
    "The groupJoin operator takes four parameters:\n",
    "\n",
    "1. the second Observable to combine with the source Observable\n",
    "1. a function that accepts an item from the source Observable and returns an Observable whose lifespan governs the duration during which that item will combine with items from the second Observable\n",
    "1. a function that accepts an item from the second Observable and returns an Observable whose lifespan governs the duration during which that item will combine with items from the first Observable\n",
    "1. a function that accepts an item from the first Observable and an Observable that emits items from the second Observable and returns an item to be emitted by the Observable returned from groupJoin\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "========== group_join ==========\n",
      "\n",
      "function group_join of module rx.linq.observable.groupjoin:\n",
      "Correlates the elements of two sequences based on overlapping\n",
      "    durations, and groups the results.\n",
      "\n",
      "    Keyword arguments:\n",
      "    right -- The right observable sequence to join elements for.\n",
      "    left_duration_selector -- A function to select the duration (expressed\n",
      "        as an observable sequence) of each element of the left observable\n",
      "        sequence, used to determine overlap.\n",
      "    right_duration_selector -- A function to select the duration (expressed\n",
      "        as an observable sequence) of each element of the right observable\n",
      "        sequence, used to determine overlap.\n",
      "    result_selector -- A function invoked to compute a result element for\n",
      "        any element of the left sequence with overlapping elements from the\n",
      "        right observable sequence. The first parameter passed to the\n",
      "        function is an element of the left sequence. The second parameter\n",
      "        passed to the function is an observable sequence with elements from\n",
      "        the right sequence that overlap with the left sequence's element.\n",
      "\n",
      "    Returns an observable sequence that contains result elements computed\n",
      "    from source elements that have an overlapping duration.\n",
      "    \n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   1.3     M New subscription on stream 273932389\n",
      " 534.5  T500 [next]  533.0: First : 4 Second: 4\n",
      " 536.0  T501 [next]  534.5: First : 4 Second: 4\n",
      " 640.2  T504 [next]  638.8: First : 5 Second: 5\n",
      "18690.9 T1191 [next] 18689.5: First : 176 Second: 177\n",
      "19010.8 T1202 [next] 19009.4: First : 179 Second: 180\n",
      "19011.9 T1202 [cmpl] 19010.5: fin\n"
     ]
    }
   ],
   "source": [
    "rst(O.group_join, title='group_join')\n",
    "xs = O.interval(100).map(lambda i: 'First : %s' % i)\n",
    "ys = O.interval(100).map(lambda i: 'Second: %s' % i)\n",
    "d = subs(xs.group_join(ys,\n",
    "                       lambda _: O.timer(0),\n",
    "                       lambda _: O.timer(0),\n",
    "                       lambda x, yy: yy.select(lambda y: '%s %s' % (x, y))).merge_all().take(5))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### ... by means of Pattern and Plan intermediaries And/Then/When, **[and / then / when](http://reactivex.io/documentation/operators/and-then-when.html)**\n",
    "The combination of the And, Then, and When operators behave much like the Zip operator, but they do so by means of intermediate data structures. And accepts two or more Observables and combines the emissions from each, one set at a time, into Pattern objects. Then operates on such Pattern objects, transforming them in a Plan. When in turn transforms these various Plan objects into emissions from an Observable.\n",
    "\n",
    "[details](http://www.introtorx.com/content/v1.0.10621.0/12_CombiningSequences.html#AndThenWhen) \n",
    "The And/Then/When trio has more overloads that enable you to group an even greater number of sequences. They also allow you to provide more than one 'plan' (the output of the Then method). This gives you the Merge feature but on the collection of 'plans'. I would suggest playing around with them if this functionality is of interest to you. The verbosity of enumerating all of the combinations of these methods would be of low value. You will get far more value out of using them and discovering for yourself.\n",
    "\n",
    "As we delve deeper into the depths of what the Rx libraries provide us, we can see more practical usages for it. Composing sequences with Rx allows us to easily make sense of the multiple data sources a problem domain is exposed to. We can concatenate values or sequences together sequentially with StartWith, Concat and Repeat. We can process multiple sequences concurrently with Merge, or process a single sequence at a time with Amb and Switch. Pairing values with CombineLatest, Zip and the And/Then/When operators can simplify otherwise fiddly operations like our drag-and-drop examples and monitoring system status.\n",
    "\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 59,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "   1.1     M New subscription on stream 275998165\n",
      "1002.9 T1468 [next] 1001.8: \n",
      "\n",
      "Seconds : 0 from time: 1.00\n",
      "HalfSecs: 0 from time: 0.50\n",
      "10thS   : 0 from time: 0.11\n",
      "2008.3 T1477 [next] 2007.2: \n",
      "\n",
      "Seconds : 1 from time: 2.01\n",
      "HalfSecs: 1 from time: 1.01\n",
      "10thS   : 1 from time: 0.21\n",
      "3011.7 T1480 [next] 3010.6: \n",
      "\n",
      "Seconds : 2 from time: 3.01\n",
      "HalfSecs: 2 from time: 1.51\n",
      "10thS   : 2 from time: 0.32\n",
      "4014.4 T1483 [next] 4013.3: \n",
      "\n",
      "Seconds : 3 from time: 4.01\n",
      "HalfSecs: 3 from time: 2.01\n",
      "10thS   : 3 from time: 0.42\n",
      "5018.9 T1484 [next] 5017.8: \n",
      "\n",
      "Seconds : 4 from time: 5.02\n",
      "HalfSecs: 4 from time: 2.52\n",
      "10thS   : 4 from time: 0.52\n",
      "5019.9 T1484 [cmpl] 5018.8: fin\n"
     ]
    }
   ],
   "source": [
    "rst()\n",
    "# see the similarity to zip. \n",
    "ts = time.time()\n",
    "def _dt():\n",
    "    # giving us info when an element was created:\n",
    "    return 'from time: %.2f' % (time.time() - ts)\n",
    "one = O.interval(1000) .map(lambda i: 'Seconds : %s %s' % (i, _dt())).take(5)\n",
    "two = O.interval(500)  .map(lambda i: 'HalfSecs: %s %s' % (i, _dt())).take(5)\n",
    "three = O.interval(100).map(lambda i: '10thS   : %s %s' % (i, _dt())).take(5)\n",
    "\n",
    "z = O.when(\n",
    "   one          \\\n",
    "    .and_(two)  \\\n",
    "    .and_(three)\\\n",
    "    .then_do(lambda a, b, c: '\\n'.join(('', '', a, b, c))))\n",
    "\n",
    "# from the output you see that the result stream consists of elements built at each interval\n",
    "# (which is in the past for 'two' and 'three'),\n",
    "# buffered until the 1 second sequence 'one' advances a step. \n",
    "d = subs(z)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## ... and emitting the items from only the most-recently emitted of those Observables **[switch_latest](http://reactivex.io/documentation/operators/and-then-when.html)**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 71,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "function switch_latest of module rx.linq.observable.switchlatest:\n",
      "Transforms an observable sequence of observable sequences into an\n",
      "    observable sequence producing values only from the most recent\n",
      "    observable sequence.\n",
      "\n",
      "    :returns: The observable sequence that at any point in time produces the\n",
      "    elements of the most recent inner observable sequence that has been\n",
      "    received.\n",
      "    :rtype: Observable\n",
      "    \n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   0.9     M New subscription on stream 275998049\n",
      "   1.8     M [next]    0.8: 0 (from stream nr 0)\n",
      "   2.9     M [next]    1.9: 1 (from stream nr 1)\n",
      "   3.5     M [next]    2.5: 2 (from stream nr 2)\n",
      "   3.9     M [next]    2.9: 3 (from stream nr 2)\n",
      "   4.0     M [next]    3.0: 4 (from stream nr 2)\n",
      "   4.2     M [cmpl]    3.2: fin\n"
     ]
    }
   ],
   "source": [
    "rst(O.switch_latest)\n",
    "s = O.range(0, 3).select(lambda x: O.range(x, 3)\\\n",
    "                                  # showing from which stream our current value comes:\n",
    "                                  .map(lambda v: '%s (from stream nr %s)' % (v, x)))\\\n",
    "                 .switch_latest()\n",
    "d = subs(s)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.12"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
